Introduction

There are use cases where we would like to get the first or last of something within a group or particular grain.

It is natural to do something in SQL like:

select 
    col_1,
    first(col_2) as first_something,
    last(col_2) as first_something
from table
group by 1
order by 1

Which leads us to writing spark code like this df.orderBy().groupBy().agg(). This has unexpected behaviours in spark and can be different each run.

Library Imports

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window

Create a SparkSession. No need to create SparkContext as you automatically get it as part of the SparkSession.

spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

sc = spark.sparkContext

Initial Datasets

pets = spark.createDataFrame(
    [
        (1, 1, datetime(2018, 1, 1, 1 ,1, 1), 'Bear', 5),
        (2, 1, datetime(2010, 1, 1, 1 ,1, 1), 'Chewie', 15),
        (3, 1, datetime(2015, 1, 1, 1 ,1, 1), 'Roger', 10),
    ], ['id', 'breed_id', 'birthday', 'nickname', 'age']
)

pets.toPandas()
id breed_id birthday nickname age
0 1 1 2018-01-01 01:01:01 Bear 5
1 2 1 2010-01-01 01:01:01 Chewie 15
2 3 1 2015-01-01 01:01:01 Roger 10

Option 1: Wrong Way

Result 1

df_1 = (
    pets
    .orderBy('birthday')
    .groupBy('breed_id')
    .agg(F.first('nickname').alias('first_breed'))
)

df_1.toPandas()
breed_id first_breed
0 1 Chewie

Result 2

df_2 = (
    pets
    .orderBy('birthday')
    .groupBy('breed_id')
    .agg(F.first('nickname').alias('first_breed'))
)

df_2.toPandas()
breed_id first_breed
0 1 Chewie

Option 2: Window Object, Right Way

window = Window.partitionBy('breed_id').orderBy('birthday')

df_3 = (
    pets
    .withColumn('first_breed', F.first('nickname').over(window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
    .withColumn('rn', F.row_number().over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
)

df_3.toPandas()
id breed_id birthday nickname age first_breed rn
0 2 1 2010-01-01 01:01:01 Chewie 15 Chewie 1
1 3 1 2015-01-01 01:01:01 Roger 10 Chewie 2
2 1 1 2018-01-01 01:01:01 Bear 5 Chewie 3

Summary

Ok so my example didn't work locally lol, but trust me it that orderBy() in a statement like this: orderBy().groupBy() doesn't maintain it's order!

reference: https://stackoverflow.com/a/50012355

For anything aggregation that needs an ordering performed (ie. first, last, etc.), we should avoid using groupby()s and instead we should use a window object.

results matching ""

    No results matching ""